import pandas as pd
obj = open('../data/Anonymized_Fermentation_Data_final.xlsx', 'rb')
data = pd.read_excel(obj,sheet_name='data')
meta_data = pd.read_excel(obj,sheet_name='meta data')
Absolute_Interval. The variables are described in the meta_data and are comprised of a variety of fermentation process (meta data), physiological and biochemical parameters (independent).¶meta_data.query('target == 1) and include:¶Run_Execution: was the fermentation process executed as planned (yes/no == alpha/beta)? Run_Performance: was the strain viable during the fermentation process (yes/no == gamma/delta)? Product_Produced__g: how much product molecule was produced (grams)Titer_End__g_over_kg: how much product was produced relative to the input feedstock (grams/kilo grams)data.head()
meta_data.head()
print('The data contains {} samples and {} variables'.format(data.shape[0],data.shape[1]))
print('There are {} unique strains which are replicated or measured under different fermentation conditions.'.format(data['Strain'].nunique()))
print('The variables are comprised of a variety of fermentation process, physiological and biochemical parameters:')
display(meta_data['variable type'].value_counts())
print('Predictive modeling targets include: {}'.format(meta_data.query('target == 1').name.values))
display(data[['Run_Execution', 'Run_Performance']].apply(lambda x: x.value_counts()))
#prepare data for analysis
#split out numeric from categorical varibles
var_type_filter = [x in ['physiological','biochemical','process'] for x in meta_data['variable type']]
var_dtype_filter = (data.dtypes == 'float64') | (data.dtypes == 'int64')
numeric_vars = (var_type_filter & var_dtype_filter).values
data[data.columns[numeric_vars]]
# meta_data.query('name in {}'.format(data.columns[numeric_vars]))
meta_data.query('name in {}'.format(list(data.columns[numeric_vars].values)))
#prepare data for analysis
var_type_filter = [x in ['physiological','biochemical','process'] for x in meta_data['variable type']]
var_dtype_filter = (data.dtypes == 'float64') | (data.dtypes == 'int64')
numeric_vars = (var_type_filter & var_dtype_filter).values
numeric_x_data = data[data.columns[numeric_vars]]
#things to try to predict
y_data = data[data.columns[(meta_data['target'] == 1).values]]
#meta data about variables
meta_data = meta_data.query('name in {}'.format(list(data.columns[numeric_vars].values))).set_index('name')
#Variables which will be used to build the model
data.columns[numeric_vars].values
train, validate and test datasets¶from sklearn.model_selection import train_test_split
model_target = 'Run_Performance'
#maintain class balance
X_train, X_test, y_train, y_test = train_test_split(numeric_x_data, y_data, test_size=0.25, stratify = y_data[model_target], random_state=42)
#split train set to create a pseudo test or validation dataset
X_train, X_validate, y_train, y_validate = train_test_split(X_train, y_train, test_size=0.33, stratify= y_train[model_target], random_state=42)
print('The training, validation and test data contain {}, {} and {} rows respectively'.format(len(X_train),len(X_validate),len(X_test)))
#simple sklearn impute and scale numeric pipeline
from sklearn.pipeline import Pipeline
from sklearn.impute import SimpleImputer
from sklearn.preprocessing import StandardScaler
import numpy as np
#impute missing with median
imputer = SimpleImputer(missing_values=np.nan, strategy='median')
#auto scale
scaler = StandardScaler()
pipe = Pipeline([('imputer',imputer),('scaler', scaler)])
X_train_scaled = pipe.fit_transform(X_train)
training data¶from sklearn.decomposition import PCA
pca = PCA(n_components=3)
pca_result = pca.fit_transform(X_train_scaled)
#collect results
def get_results(res,prefix='',ncol=3, add=None):
out= pd.DataFrame()
for i in range(ncol):
key = prefix + str(i+1)
value = res[:,i]
out.loc[:,key] = value
if add is not None:
out = pd.concat([out,add],axis=1)
return out
df = get_results(pca_result,'pca-', add = y_train.reset_index())
print('Explained % variation per principal component: {}'.format( (pca.explained_variance_ratio_ *100).round(2)))
df.head()
#view sample scores
import plotly.graph_objects as go
#data for hover
tmp = df
customdata = np.stack(([tmp[x] for x in tmp]), axis = -1)
#hover text as html
res = []
for i,name in enumerate(tmp.columns):
res.append('<b>{}</b>: %{{customdata[{}]}}<br>'.format(name,i))
hovertemplate = ''.join(res)
#select variables to show
prefix = 'pca'
vars = []
for cols in df.columns:
if cols.find(prefix) !=-1:
vars.append(dict(label=cols,values=df[cols]))
#define custom colors
import plotly.express as px
color_var = model_target
colors = px.colors.qualitative.Plotly[0:df[color_var].nunique()] # change palette if not discreet
opt_color = []
_color_var = pd.factorize(tmp[color_var])
for i in _color_var[0]:
opt_color.append(colors[i])
fig = go.Figure(data=go.Splom(dimensions=vars,
marker=dict(color=opt_color,opacity=.5),
showupperhalf=False,
hovertemplate = hovertemplate,
customdata = customdata
)
)
fig.update_layout(
title='PCA scores',
width=800,
height=800,
)
fig.show()
pca_loadings = pd.DataFrame()
df = pca_loadings
loadings=pca.components_.T * np.sqrt(pca.explained_variance_)
extra = (pd.DataFrame({'name': X_train.columns.tolist()})
.set_index('name')
.join(meta_data)
.reset_index()
)
pca_loadings = get_results(loadings,'pca-',add = extra)
pca_loadings.head()
#select variables to show
df = pca_loadings
vars = []
for cols in df.columns:
if cols.find('pca') !=-1:
vars.append(dict(label=cols,values=df[cols]))
fig = go.Figure(data=go.Splom(dimensions=vars,
showupperhalf=False,
hovertemplate = 'variable:%{text}',
text=df['name'],
)
)
fig.update_layout(
title='PCA loadings',
width=800,
height=800,
)
fig.show()
#get top +/- loading variables
pc = 'pca-2'
n = 3
tmp = pca_loadings.sort_values(pc)
top_vars = tmp.head(n).name.tolist() + tmp.tail(n).name.tolist()
### plot top loadings variables raw data
import plotly.express as px
import math
df = pca_loadings
_df = pd.concat([y_train,X_train],axis=1)
_df = _df.melt(id_vars=[model_target])
_df = _df.query('variable in {}'.format(top_vars))
# _df['value'].describe()
# [print(x) for x in _df['value']]
df = pca_loadings
_df = pd.concat([y_train[model_target],X_train],axis=1)
_df = _df.melt(id_vars=[model_target])
_df
### plot top loadings variables raw data
import plotly.express as px
import math
df = pca_loadings
_df = pd.concat([y_train[model_target],X_train],axis=1)
_df = _df.melt(id_vars=[model_target])
_df['value'] = _df['value'].clip(lower=0)
_df = _df.query('variable in {}'.format(top_vars)).dropna()
_df['shifted_log_value'] = [math.log(x+1) for x in _df['value']]
fig = px.violin(_df, y="shifted_log_value", x="variable",
color=color_var, box=True, points="all",
hover_data=_df.columns,
color_discrete_map={'delta': colors[0] ,'gamma':colors[1]})
# fig.update_layout( yaxis_type="log")
fig.show()
#prepare data for modeling
#use the pipeline created above
_X_train = pipe.fit_transform(X_train)
_y_train = y_train[model_target]
_X_test = pipe.fit_transform(X_validate)
_y_test = y_validate[model_target]
from sklearn.ensemble import RandomForestClassifier
from sklearn.utils.class_weight import compute_class_weight
from sklearn.model_selection import RandomizedSearchCV
# Simple hyperparameter tunnig for random forest model
estimator = RandomForestClassifier(random_state = 42)
# tune grid
param_grid = {
'n_estimators': np.linspace(10, 200).astype(int),
'max_depth': [None] + list(np.linspace(3, 20).astype(int)),
'max_features': ['auto', 'sqrt', None] + list(np.arange(0.5, 1, 0.1)),
'max_leaf_nodes': [None] + list(np.linspace(10, 50, 500).astype(int)),
'min_samples_split': [1, 2, 5, 10],
'bootstrap': [True, False],
'class_weight' : ["balanced", "balanced_subsample"] # RF classifier tends to be biased towards the majority class, place a heavier penalty on misclassifying the minority class
}
print('class weights (1,0): {}'.format(compute_class_weight('balanced', np.unique(_y_train), _y_train)) )
# Create the random search model
rs = RandomizedSearchCV(estimator, param_grid, n_jobs = -1,
scoring = 'roc_auc', cv = 3,
n_iter = 10, verbose = 1, random_state=42)
# Fit
rs.fit(_X_train, _y_train)
#select best model
best_model = rs.best_estimator_
train_rf_predictions = best_model.predict(_X_train)
train_rf_probs = best_model.predict_proba(_X_train)[:, 1]
rf_predictions = best_model.predict(_X_test)
rf_probs = best_model.predict_proba(_X_test)[:, 1]
#evaluate performance on validation data
import matplotlib.pyplot as plt
from sklearn.metrics import classification_report
from sklearn.metrics import confusion_matrix
from sklearn.metrics import plot_confusion_matrix
print('Accuracy of classifier on validation set: {:.2f}'.format(best_model.score(_X_test, _y_test).round(2)))
print(classification_report(_y_test, rf_predictions))
plot_confusion_matrix(best_model, _X_test, _y_test,
# display_labels=['no','yes'],
cmap=plt.cm.Blues)
#lets rank the predictors
from sklearn.inspection import permutation_importance
result = permutation_importance(best_model, _X_test, _y_test, n_repeats=10,
random_state=42, n_jobs=2)
sorted_idx = result.importances_mean.argsort()
#show top variables
top_vars = sorted_idx[-10:]
fig, ax = plt.subplots()
ax.boxplot(result.importances[top_vars].T,
vert=False, labels = meta_data.iloc[top_vars,].index.values)
ax.set_title("Permutation Importances (validation set)")
fig.tight_layout()
plt.show()
#could simplify model by selecting top x variables and re-optimizing
#evaluate performance on test data
_X_test2 = pipe.fit_transform(X_test)
_y_test2 = y_test[model_target]
rf_predictions = best_model.predict(_X_test)
rf_probs = best_model.predict_proba(_X_test)[:, 1]
print('Accuracy of classifier on test set: {:.2f}'.format(best_model.score(_X_test, _y_test).round(2)))
print(classification_report(_y_test, rf_predictions))
plot_confusion_matrix(best_model, _X_test, _y_test,
# display_labels=['no','yes'],
cmap=plt.cm.Blues)
# #lets try tpot for fun
#can take a while
# from tpot import TPOTClassifier
# tpot = TPOTClassifier(generations=5, population_size=50, verbosity=2, random_state=42)
# tpot.fit(_X_train, _y_train)
# print(tpot.score(_X_test, _y_test))
# # tpot.export('tpot_example.py')
# best_model = tpot
# #evaluate performance on test data
# _X_test = pipe.fit_transform(X_test)
# _y_test = y_test[model_target]
# rf_predictions = best_model.predict(_X_test)
# rf_probs = best_model.predict_proba(_X_test)[:, 1]
# print('Accuracy of classifier on test set: {:.2f}'.format(best_model.score(_X_test, _y_test).round(2)))
# print(classification_report(_y_test, rf_predictions))
# plot_confusion_matrix(best_model, _X_test, _y_test,
# # display_labels=['no','yes'],
# cmap=plt.cm.Blues)
### Export datasets
X_train = pipe.fit_transform(X_train)
y_train = y_train[model_target]
X_validate = pipe.fit_transform(X_validate)
y_validate = y_validate[model_target]
X_test = pipe.fit_transform(X_test)
y_test = y_test[model_target]
save_list = ['X_train','y_train','X_validate','y_validate','X_test','y_test','meta_data']
for x in save_list:
obj = pd.DataFrame(globals()[x])
cmd = "obj.to_csv('../data/{}.csv')".format(x)
eval(cmd)